Chip - Expose option to batch emit messages#1925
Conversation
fb2696b to
4b8a841
Compare
✅ API Diff Results -
|
8a60dae to
1f62639
Compare
80c1852 to
b1e8399
Compare
b1e8399 to
56a1191
Compare
|
This PR is stale because it has been open 30 days with no activity. |
|
There was a problem hiding this comment.
Pull request overview
Adds batched Chip Ingress publishing controls/results to the protobuf contract and threads a new BatchEmit API through beholder, so callers can send multiple messages and optionally request non-atomic batch behavior. This fits into the existing telemetry/chip-ingress integration by extending both the wire format and the emitter abstraction.
Changes:
- Extend Chip Ingress protobuf types with
PublishOptions, per-result errors, and batch options. - Add
BatchEmitsupport across beholder emitters, including chip-ingress and dual-source emitters. - Update tests/mocks/helpers and wire the root module to the local
pkg/chipingressmodule during development.
Reviewed changes
Copilot reviewed 14 out of 15 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
pkg/chipingress/types.go |
Re-exports new protobuf PublishOptions type. |
pkg/chipingress/pb/chip_ingress.proto |
Adds batch publish options and per-item error fields to the RPC schema. |
pkg/chipingress/pb/chip_ingress.pb.go |
Regenerated Go protobuf bindings for the updated schema. |
pkg/chipingress/go.mod |
Promotes google.rpc status dependency to a direct requirement. |
pkg/beholder/noop.go |
Adds a no-op BatchEmit implementation. |
pkg/beholder/message.go |
Extends attribute parsing to flatten nested []any inputs. |
pkg/beholder/message_emitter.go |
Refactors single-message emit through new batch API. |
pkg/beholder/dual_source_emitter.go |
Adds batch emit path for OTLP + chip-ingress dual export. |
pkg/beholder/dual_source_emitter_test.go |
Updates test mock emitter to satisfy new batch interface. |
pkg/beholder/client.go |
Introduces public batch emit options and expands Emitter interface. |
pkg/beholder/chip_ingress_emitter.go |
Implements batch publishing via PublishBatch with options forwarding. |
pkg/beholder/chip_ingress_emitter_test.go |
Switches existing emitter tests to PublishBatch expectations. |
pkg/beholder/beholdertest/beholder.go |
Updates test helper emitter with batch support. |
go.sum |
Removes checksums for replaced external pkg/chipingress dependency. |
go.mod |
Replaces external pkg/chipingress dependency with local module path. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // ExtractSourceAndType extracts source domain and entity from the attributes | ||
| func ExtractSourceAndType(attrKVs ...any) (string, string, error) { | ||
| attributes := newAttributes(attrKVs...) | ||
|
|
||
| func ExtractSourceAndType(attributes Attributes) (string, string, error) { |
| go func(ctx context.Context) { | ||
| defer d.wg.Done() | ||
| var cancel context.CancelFunc | ||
| ctx, cancel = d.stopCh.Ctx(ctx) | ||
| defer cancel() | ||
|
|
||
| if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { | ||
| if _, err := d.chipIngressEmitter.BatchEmit(ctx, messages, options...); err != nil { | ||
| // If the chip ingress emitter fails, we ONLY log the error | ||
| // because we still want to send the data to the OTLP collector and not cause disruption | ||
| d.log.Infof("failed to emit to chip ingress: %v", err) | ||
| } | ||
| }(context.WithoutCancel(ctx)) |
| // PublishOptions controls optional behaviour of PublishBatch. | ||
| message PublishOptions { | ||
| // allOrNothing makes the batch atomic: either all events are committed or none are. | ||
| // When unset, the server defaults to true (preserving the original atomic behaviour). | ||
| // Set to false to allow partial success, where individual results carry per-event errors. | ||
| optional bool allOrNothing = 1; |
| eventPb.Options = &chipingress.PublishOptions{ | ||
| AllOrNothing: proto.Bool(emitOpts.AllOrNothing), | ||
| } | ||
|
|
||
| _, err = c.client.Publish(ctx, eventPb) | ||
| response, err := c.client.PublishBatch(ctx, eventPb) |
| case []any: | ||
| // Treat a []any element as if its contents were passed directly. | ||
| maps.Copy(a, newAttributes(t...)) | ||
| i++ |
| replace github.com/smartcontractkit/chainlink-common/pkg/chipingress => ./pkg/chipingress | ||
|
|
2be3605 to
6e51bea
Compare
CRE-3936